 1.Flink常用API详解之Flink DataStream常用API
   
   DataStream API主要分为3块：DataSource、Transformation、Sink
   DataSource是程序的数据源输入，可以通过
StreamExecutionEnvironment.addSource(sourceFuntion)为程序添加一个数据源
   Transformation是具体的操作，它对一个或多个输入数据源进行计算处理，比如Map、FlatMap
和Filter等操作
   Sink是程序的输出，它可以把Transformation处理之后的数据输出到指定的存储介质中。
   1).DataSource
   (1).基于文件
        readTextFile(path)

   读取文本文件，文件遵循TextInputFormat逐行读取规则并返回
   tip:本地Idea读hdfs需要：
   依赖：
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.5</version>
        </dependency>

package com.lagou.streamdatasource;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SourceFromFile {
    public static void main(String[] args) throws Exception {
        //String inputPath = "D:\\data\\input\\hello.txt";
        String inputHdfsPath = "hdfs://linux121:9000/hi.txt";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.readTextFile(inputHdfsPath);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        });
        KeyedStream<Tuple2<String, Integer>, Object> tuple2StringKeyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = tuple2StringKeyedStream.sum(1);
        result.print();
        env.execute();
    }
}

   (2).基于Socket
       socketTextStream
   从Socket中读取数据，元素可以通过一个分隔符分开
   (3).基于集合
       fromCollection(Collection)
   通过Java的Collection集合创建一个数据流，集合中的所有元素必须是相同类型的
   如果满足以下条件，Flink将数据类型识别为POJO类型（并允许“按名称”字段引用）：
       该类是共有且独立的（没有非静态内部类）
       该类有共有的无参构造方法
	   类（及父类）中所有的不被static、transient修饰的属性要么有公有的（且不被final修饰），要么
是包含共有的getter和setter方法，这些方法遵循java bean命名规范。

package com.lagou.streamdatasource;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

public class SourceFromCollection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        DataStreamSource<String> data = env.fromElements("spark", "flink");
        ArrayList<People> peopleList = new ArrayList<People>();
        peopleList.add(new People("lucas", 18));
        peopleList.add(new People("jack", 28));
        peopleList.add(new People("luck", 38));
        DataStreamSource<People> data = env.fromCollection(peopleList);
        SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
            @Override
            public boolean filter(People people) throws Exception {
                return people.age > 20;
            }
        });
        filtered.print();
        env.execute();
    }

    static class People {
        String name;
        Integer age;

        public People(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        @Override
        public String toString() {
            return "People{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
}

   (4).自定义输入
   可以使用StreamExecutionEnvironment.addSource(sourceFunction)将一个流式数据源加到程序中。
   Flink提供了许多预先实现的源函数，但是也可以编写自己的自定义源，方法是为非并行源implements
SourceFunction，或者为并行源 implements ParallelSourceFunction接口，或者extends
RichParallelSourceFunction。
   Flink也提供了一批内置的Connector（连接器），如下表列了几个主要的
   连接器              是否提供Source支持   是否提供Sink支持
   Apache Kafka               是                   是
   ElasticSearch              否                   是
   HDFS                       否                   是
   Twitter Streaming PI       是                   否
   Kafka连接器
   a、依赖：
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.11.1</version>
        </dependency>
   b、代码：
package com.lagou.streamdatasource;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String topic = "animal";
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "linux122:9092");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), props);
        DataStreamSource<String> data = env.addSource(consumer);
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                for (String word : s.split(" ")) {
                    collector.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        });
        KeyedStream<Tuple2<String, Integer>, Object> tuple2StringKeyedStream = wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = tuple2StringKeyedStream.sum(1);
        result.print();
        env.execute();
    }
}

   c、启动kafka
   ./kafka-server-start.sh -daemon ../config/server.properties
   d、创建topic
   bin/kafka-topics.sh  --zookeeper linux122:2181/myKafka  
--create --topic animal --partitions 1 --replication-factor 1
   e、启动控制台kafka生产者
   ./kafka-console-producer.sh --broker-list linux122:9092 --topic animal
   为非并行源implements SourceFunction，或者为并行源 implements ParallelSourceFunction接口，
或者extends RichParallelSourceFunction。
   为非并行源implements SourceFunction
package com.lagou.streamdatasource;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * 没有并行源的自定义数据源
 */
public class SourceFromNoParalleSource implements SourceFunction<String> {
    long count = 0;
    boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(String.valueOf(count));
            count++;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package com.lagou.streamdatasource;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SourceFromNoParalleSourceRun {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.addSource(new SourceFromNoParalleSource());
        data.print();
        env.execute();
    }
}
   
   为并行源 implements arallelSourceFunction接口
package com.lagou.streamdatasource;

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

public class SelfSourceParalle implements ParallelSourceFunction<String> {
    long count =0;
    boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(String.valueOf(count));
            count++;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = true;
    }
}


package com.lagou.streamdatasource;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SelfSourceParalleRun {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.addSource(new SelfSourceParalle());
        data.print();
        env.execute();
    }
}

   extends RichParallelSourceFunction
package com.lagou.streamdatasource;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class SelfSourceParalleExtendsRich extends RichParallelSourceFunction<String> {
    long count = 0;
    boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(String.valueOf(count));
            count++;
            Thread.sleep(1000);
        }

    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package com.lagou.streamdatasource;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SelfSourceRichRun {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.addSource(new SelfSourceParalleExtendsRich());
        data.print();
        env.execute();
    }
}
   
   总结自定义数据源：flinkkafkaconnector源码初探：
   open方法：初始化
   run方法：从kafka拉取数据
   2).Transformation
   Flink针对DataStream提供了大量的已经实现的算子
   Map
   DataStream → DataStream
Takes one element and produces one element. A map function that doubles the values of the
   input stream:
   DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
 @Override
 public Integer map(Integer value) throws Exception {
   return 2 * value;
 }
});
   
   FlatMap
   DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits
sentences to words:
   dataStream.flatMap(new FlatMapFunction<String, String>() {
 @Override
 public void flatMap(String value, Collector<String> out)
   throws Exception {
   for(String word: value.split(" ")){
     out.collect(word);
   }
 }
});
   
   Filter
   DataStream → DataStream
Evaluates a boolean function for each element and retains those for which the function returns
true. A filter that filters out zero values:
   dataStream.filter(new FilterFunction<Integer>() {
 @Override
 public boolean filter(Integer value) throws Exception {
   return value != 0;
 }
});
   
   KeyBy
   DataStream → KeyedStream
   Logically partitions a stream into disjoint partitions. All records with the same key are assigned to
the same partition. Internally, keyBy() is implemented with hash partitioning. There are different
ways to specify keys.
   This transformation returns a KeyedStream, which is, among other things, required to use keyed
state.
   dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
   dataStream.keyBy(value -> value.f0) // Key by the first element of a Tup
   Attention A type cannot be a key if:
   it is a POJO type but does not override the hashCode() method and relies on the
Object.hashCode() implementation.
   it is an array of any type.
   Reduce
   KeyedStream → DataStream
   A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced
value and emits the new value.
   A reduce function that creates a stream of partial sums:
   keyedStream.reduce(new ReduceFunction<Integer>() {
 @Override
 public Integer reduce(Integer value1, Integer value2)
 throws Exception {
   return value1 + value2;
 }
});
   
   Fold
   KeyedStream → DataStream
   A "rolling" fold on a keyed data stream with an initial value. Combines the current element with
the last folded value and emits the new value.
   A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1",
"start-1-2", "start-1-2-3", ...
   DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
 @Override
 public String fold(String current, Integer value) {
   return current + "-" + value;
 }
});

   Aggregations
   KeyedStream → DataStream
   Rolling aggregations on a keyed data stream. The difference between min and minBy is that min
returns the minimum value, whereas minBy returns the element that has the minimum value in
this field (same for max and maxBy).
   keyedStream.sum(0);
   keyedStream.sum("key");
   keyedStream.min(0);
   keyedStream.min("key");
   keyedStream.max(0);
   keyedStream.max("key");
   keyedStream.minBy(0);
   keyedStream.minBy("key");
   keyedStream.maxBy(0);
   keyedStream.maxBy("key");
   
   Window
   KeyedStream → WindowedStream
   Windows can be defined on already partitioned KeyedStreams. Windows group the data in each
key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See
windows for a complete description of windows.
   dataStream.keyBy(value ->
value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5
seconds of data
   
   WindowAll
   DataStream → AllWindowedStream
   Windows can be defined on regular DataStreams. Windows group all the stream events
according to some characteristic (e.g., the data that arrived within the last 5 seconds). See
windows for a complete description of windows.
   WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one
task for the windowAll operator.
   dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5
seconds of data
   
   Window Apply
   WindowedStream → DataStream
   AllWindowedStream → DataStream
   Applies a general function to the window as a whole. Below is a function that manually sums the
elements of a window.
   Note: If you are using a windowAll transformation, you need to use an AllWindowFunction
instead.
   windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple,
Window>() {
 public void apply (Tuple tuple,
     Window window,
     Iterable<Tuple2<String, Integer>> values,
     Collector<Integer> out) throws Exception {
   int sum = 0;
   for (value t: values) {
     sum += t.f1;
   }
   out.collect (new Integer(sum));
 }
});
   
   // applying an AllWindowFunction on non-keyed window stream
   allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer,
Window>() {
 public void apply (Window window,
     Iterable<Tuple2<String, Integer>> values,
     Collector<Integer> out) throws Exception {
   int sum = 0;
   for (value t: values) {
     sum += t.f1;
   }
   out.collect (new Integer(sum));
 }
});
   Window Reduce
   WindowedStream → DataStream
   Applies a functional reduce function to the window and returns the reduced value.
   windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
 public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
   return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
 }
});
   
   Window Fold
   WindowedStream → DataStream
   Applies a functional fold function to the window and returns the folded value. The example
function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-
4-5":
   windowedStream.fold("start", new FoldFunction<Integer, String>() {
 public String fold(String current, Integer value) {
   return current + "-" + value;
 }
});
   
   Aggregations on windows
   WindowedStream → DataStream
   Aggregates the contents of a window. The difference between min and minBy is that min returns
the minimum value, whereas minBy returns the element that has the minimum value in this field
(same for max and maxBy).
   windowedStream.sum(0);
   windowedStream.sum("key");
   windowedStream.min(0);
   windowedStream.min("key");
   windowedStream.max(0);
   windowedStream.max("key");
   windowedStream.minBy(0);
   windowedStream.minBy("key");
   windowedStream.maxBy(0);
   windowedStream.maxBy("key");
   
   Union
   DataStream → DataStream
   Union of two or more data streams creating a new stream containing all the elements from all
the streams. Note: If you union a data stream with itself you will get each element twice in the
resulting stream.
   dataStream.union(otherStream1, otherStream2, ...);
   
   Window Join
   DataStream,DataStream → DataStream
   Join two data streams on a given key and a common window.
   dataStream.join(otherStream)
 .where(<key selector>).equalTo(<key selector>)
 .window(TumblingEventTimeWindows.of(Time.seconds(3)))
 .apply (new JoinFunction () {...});
   
   Interval Join
   KeyedStream,KeyedStream → DataStream
   Join two elements e1 and e2 of two keyed streams with a common key over a given time interval,
so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
   // this will join the two streams so that
   // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
   keyedStream.intervalJoin(otherKeyedStream)
 .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper
bound
 .upperBoundExclusive(true) // optional
 .lowerBoundExclusive(true) // optional
 .process(new IntervalJoinFunction() {...});
   
   Window CoGroup
   DataStream,DataStream → DataStream
   Cogroups two data streams on a given key and a common window.
   dataStream.coGroup(otherStream)
 .where(0).equalTo(1)
 .window(TumblingEventTimeWindows.of(Time.seconds(3)))
 .apply (new CoGroupFunction () {...});
   
   Connect
   DataStream,DataStream → ConnectedStreams
   "Connects" two data streams retaining their types. Connect allowing for shared state between
the two streams.
   DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams =
someStream.connect(otherStream);
   
   CoMap, CoFlatMap
   ConnectedStreams → DataStream
   Similar to map and flatMap on a connected data stream
   connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
 @Override
public Boolean map1(Integer value) {
  return true;
}
@Override
public Boolean map2(String value) {
 return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
 @Override
 public void flatMap1(Integer value, Collector<String> out) {
   out.collect(value.toString());
 }
 @Override
 public void flatMap2(String value, Collector<String> out) {
   for (String word: value.split(" ")) {
    out.collect(word);
   }
 }
});

package com.lagou.streamtransformation;

import com.lagou.streamdatasource.SelfSourceParalle;
import com.lagou.streamdatasource.SelfSourceParalleExtendsRich;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data1 = env.addSource(new SelfSourceParalle());
        DataStreamSource<String> data2 = env.addSource(new SelfSourceParalleExtendsRich());
        ConnectedStreams<String, String> connected = data1.connect(data2);
        SingleOutputStreamOperator<String> mapd = connected.map(new CoMapFunction<String, String, String>() {
            @Override
            public String map1(String value) throws Exception {
                return value;
            }

            @Override
            public String map2(String value) throws Exception {
                return value;
            }
        });

        mapd.print();
        env.execute();
    }
}
   
   
   Split
   DataStream → SplitStream
   Split the stream into two or more streams according to some criterion.
   

   SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>()
{
 @Override
 public Iterable<String> select(Integer value) {
   List<String> output = new ArrayList<String>();
   if (value % 2 == 0) {
     output.add("even");
   }
   else {
     output.add("odd");
   }
   return output;
 }
});
   
   Select
   SplitStream → DataStream
   Select one or more streams from a split stream.
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

package com.lagou.streamtransformation;

import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class SplitSelectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
        SplitStream<Integer> splited = data.split(new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                ArrayList<String> output = new ArrayList<>();
                if (value % 2 == 0) {
                    output.add("even");
                } else {
                    output.add("odd");
                }
                return output;
            }
        });

        DataStream<Integer> even = splited.select("even");
        even.print();
        env.execute();
    }
}
   
   Iterate
   DataStream → IterativeStream → DataStream
   Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous
operator. This is especially useful for defining algorithms that continuously update a model. The
following code starts with a stream and applies the iteration body continuously. Elements that
are greater than 0 are sent back to the feedback channel, and the rest of the elements are
forwarded downstream. See iterations for a complete description.

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
 @Override
 public boolean filter(Long value) throws Exception {
   return value > 0;
 }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
 @Override
 public boolean filter(Long value) throws Exception {
   return value <= 0;
 }
});
   3).Sink
   Flink针对DataStream提供了大量的已经实现的数据目的地（Sink），具体如下所示
        writeAsText()：讲元素以字符串形式逐行写入，这些字符串通过调用每个元素的toString()方法来
获取
        print()/printToErr()：打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
        自定义输出：addSink可以实现把数据输出到第三方存储介质中
		Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持,如1.1 节中表所示
   案例：将流数据下沉到redis中
   1、 依赖：
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
        </dependency>
   2、 关键代码：
package com.lagou.streamsink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class MySinkToRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("linux123", 7777);

        SingleOutputStreamOperator<Tuple2<String, String>> m_word = data.map(new MapFunction<String, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(String value) throws Exception {
                return new Tuple2<String, String>("m_word", value);
            }
        });

        FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();
        builder.setHost("linux123");
        builder.setPort(6379);
//        builder.setPassword("lucas");
        FlinkJedisPoolConfig config = builder.build();
        RedisSink redisSink = new RedisSink(config, new RedisMapper<Tuple2<String, String>>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.LPUSH);
            }

            @Override
            public String getKeyFromData(Tuple2<String, String> data) {
                return data.f0;
            }

            @Override
            public String getValueFromData(Tuple2<String, String> data) {
                return data.f1;
            }
        });

        m_word.addSink(redisSink);
        env.execute();
    }
}
   
   案例2：将流数据下沉到redis中--自定义
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.21</version>
        </dependency>

package com.lagou.streamsink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class MySinkToMysql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final String url = "jdbc:mysql://linux123:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
        final String user = "root";
        final String password = "12345678";
        Student stu1 = new Student("lucas", 18);
        Student stu2 = new Student("jack", 28);
        DataStreamSource<Student> data = env.fromElements(stu1,stu2);
        data.addSink(new RichSinkFunction<Student>() {
            Connection connection = null;
            PreparedStatement preparedStatement = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                connection = DriverManager.getConnection(url, user, password);
                String sql = "insert into student (name,age) values (?,?)";
                preparedStatement = connection.prepareStatement(sql);
            }

            @Override
            public void invoke(Student value, Context context) throws Exception {
                preparedStatement.setString(1, value.getName());
                preparedStatement.setInt(2, value.getAge());
                preparedStatement.executeUpdate();
            }

            @Override
            public void close() throws Exception {
                if (connection != null) {
                    connection.close();
                }
                if (preparedStatement != null) {
                    preparedStatement.close();
                }
            }
        });
        env.execute();
    }
}
   
   案例3：下沉到Kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class StreamToKafka {
	public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStreamSource<String> data = env.socketTextStream("linux122", 7777);
        String brokerList = "linux122:9092";
        String topic = "animal";
		FlinkKafkaProducer producer = new FlinkKafkaProducer(brokerList, topic,new SimpleStringSchema());
		data.addSink(producer);
        env.execute();
    }
}	
	